#include "config.h"

#include <sys/mman.h>
#include <libaio.h>
#include <errno.h>
#include <ctype.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "core.h"
#include "sysy_lib.h"
#include "clock.h"
#include "clock_mem.h"
#include "clock_merge.h"
#include "../../schedule/cpuset.h"
#include "squeue.h"
#include "disk.h"
#include "net_global.h"
#include "variable.h"
#include "dbg.h"

#define MD_RECYCLE_END (1)
#define MD_RECYCLE_BEGIN (10)

#define MD_CRC(__op__) \
        crc32_sum((void *)(__op__) + sizeof(uint32_t) , \
                  sizeof(*__op__) - sizeof(uint32_t));        

#pragma pack(8)

typedef struct {
#if 0
        uint32_t used;
        uint16_t len;
        uint8_t op;
        uint8_t dirty;
#endif
        char used;
        char op;
        char dirty;
        char __unused1__;
        uint8_t __unused2__;
        
        chkid_t id;
        vclock_t vclock;
} op_t;

#pragma pack()

typedef struct {
        sy_spinlock_t lock;
        int fd;
        int idx;
        int offset;
        void *addr;
        const char *log;
        const char *data;
        int recycle;
        uint64_t min;
        uint64_t max;
        clock_mem_t *merge;
        sem_t recycle_sem;
} clock_queue_t;

typedef struct {
        int inited;
        int thread;
        clock_mem_t *clock_mem;
        clock_queue_t queue[CLOCK_HASH];
        char log[MAX_PATH_LEN];
        char data[MAX_PATH_LEN];
} logic_clock_t;

typedef struct {
        uint64_t max;
        uint64_t min;
} arg_t;

//static __thread logic_clock_t *__logic_clock__;
//static __thread clock_mem_t *clock_mem;
static char *__log__;
static char *__data__;
static logic_clock_t *__logic_clock__;
int clock_table_size = 0;

static int __clock_iterator(const char *parent, const char *name, void *_arg)
{
        int ret;
        uint64_t idx;
        arg_t *arg;

        (void) parent;
        arg = _arg;

        if (isdigit(name[0])) {
                idx = atol(name);

                arg->min = idx < arg->min ? idx : arg->min;
                arg->max = idx > arg->max ? idx : arg->max;
        } else if (memcmp(name, "merge", strlen("merge")) == 0
                   || memcmp(name, "backup", strlen("backup")) == 0) {
                DINFO("skip %s\n", name);
        } else {
                ret = EIO;
                GOTO(err_ret, ret);
        }
        
        return 0;
err_ret:
        return ret;
}

static int __clock_offset(clock_queue_t *queue)
{
        int ret, fd, left, offset, done = 0, i;
        char buf[MD_BUFFER_SIZE];
        op_t *op, *array;

        YASSERT(MD_BUFFER_SIZE % sizeof(op_t) == 0);

        fd = queue->fd;

        left = MD_MAX;
        offset = 0;
        while (left) {
                ret = _pread(fd, buf, MD_BUFFER_SIZE, offset);
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_ret, ret);
                }

                YASSERT(ret == MD_BUFFER_SIZE);

                array = (void *)buf;
                for (i = 0; i < (int)(MD_BUFFER_SIZE / sizeof(op_t)); i++) {
                        op = &array[i];

                        if (op->used == 0) {
                                queue->offset = offset +  sizeof(op_t) * i;
                                done = 1;
                                break;
                        }
                }

                left -= MD_BUFFER_SIZE;
                offset += MD_BUFFER_SIZE;

                if (done)
                        break;
        }

        if (done == 0) {
                DWARN("not found\n");
                queue->offset = MD_MAX;
        }

        return 0;
err_ret:
        return ret;
}

static int __clock_load_seg(const char *path, int size, int (*callback)(const op_t *op, void *ctx), void *ctx)
{
        int ret, fd, left, offset, cp, i;
        char buf[MD_BUFFER_SIZE];
        op_t *op, *array;
        //uint32_t crc;

        fd = open(path, O_RDONLY);
        if (fd < 0) {
                ret = errno;
                DWARN("open %s, ret:%d\n", path, ret);
                GOTO(err_ret, ret);
        }

        left = size;
        offset = 0;
        while (left) {
                cp = left < MD_BUFFER_SIZE ? left : MD_BUFFER_SIZE;
                ret = _pread(fd, buf, cp, offset);
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_fd, ret);
                }

                YASSERT(ret == cp);

                array = (void *)buf;
                for (i = 0; i < (int)(cp / sizeof(op_t)); i++) {
                        op = &array[i];
                        if (op->used == 0)
                                break;

                        //crc = MD_CRC(op);
                        //YASSERT(op->crc);
                        //YASSERT(op->crc == crc);

                        ret = callback(op, ctx);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                left -= cp;
                offset += cp;
        }

        close(fd);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

static int __clock_load_op(const op_t *op, void *ctx)
{
        int ret;
        clock_mem_t *__clock_mem;

        __clock_mem = ctx;

        switch (op->op) {
        case MD_OP_VCLOCK:
                ret = clock_mem_vclock_set(__clock_mem, &op->id, &op->vclock, op->dirty);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case MD_OP_REMOVE:
                ret = clock_mem_remove(__clock_mem, &op->id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        default:
                YASSERT(0);
        }

        return 0;
err_ret:
        return ret;
}

int clock_load(const char *path, int size, clock_mem_t *__clock_mem)
{
        int ret;

        ret = __clock_load_seg(path, size, __clock_load_op, __clock_mem);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void __clock_close(clock_queue_t *queue)
{
        //msync(queue->addr, MD_MAX, 0);
        munmap(queue->addr, MD_MAX);
        //fsync(queue->fd);
        close(queue->fd);
}

static int __clock_open(clock_queue_t *queue, uint64_t file, const char *home)
{
        int ret, fd;
        void *addr;
        char path[MAX_PATH_LEN];

        snprintf(path, MAX_LINE_LEN, "%s/%d/%llu", home, queue->idx, (LLU)file);

        fd = _open(path, O_RDWR | O_CREAT, 0644);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = ftruncate(fd, MD_MAX);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        queue->fd = fd;

        addr = mmap(0, MD_MAX, PROT_WRITE, MAP_SHARED, fd, 0);
        if (addr == MAP_FAILED) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        queue->addr = addr;

        return 0;
err_ret:
        return ret;
}

static int __clock_load(clock_queue_t *queue, clock_mem_t *__clock_mem, const char *home)
{
        int ret, size;
        char path[MAX_PATH_LEN];
        arg_t arg;
        uint64_t i;

        snprintf(path, MAX_LINE_LEN, "%s/%d/", home, queue->idx);
        ret = path_validate(path, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
                GOTO(err_ret, ret);
        }

        arg.max = 0;
        arg.min = UINT64_MAX;
        ret = _dir_iterator(path, __clock_iterator, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (arg.max == 0 && arg.min == UINT64_MAX)
                arg.min = 0;

        DINFO("log [%llu, %llu] @ %s\n", (LLU)arg.min, (LLU)arg.max, path);

        queue->max = arg.max;
        queue->min = arg.min;

        ret = __clock_open(queue, queue->max, home);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = __clock_offset(queue);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = queue->min; i <= queue->max; i++) {
                snprintf(path, MAX_LINE_LEN, "%s/%d/%llu", home, queue->idx, (LLU)i);

                if (i == queue->max) {
                        size = queue->offset;
                } else {
                        size = MD_MAX;
                }

                ret = clock_load(path, size, __clock_mem);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __clock_queue_init(clock_queue_t *queue, int idx)
{
        int ret;

        ret = sy_spin_init(&queue->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = sem_init(&queue->recycle_sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        queue->idx = idx;
        queue->merge = NULL;

        return 0;
err_ret:
        return ret;
}

static int __clock_submit(clock_queue_t *queue, const char *buf, int size, const char *home)
{
        int ret;
        ANALYSIS_BEGIN(0);

        YASSERT(queue->offset <= MD_MAX);

        if (queue->offset == MD_MAX) {
                __clock_close(queue);

                queue->max++;
                ret = __clock_open(queue, queue->max, home);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                queue->offset = 0;
        }

        YASSERT(queue->offset + size <= MD_MAX);

        memcpy(queue->addr + queue->offset, buf, size);
        queue->offset += size;

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;
err_ret:
        return ret;
}

static void *__clock_recycle_worker(void *_args)
{
        int ret, max;
        clock_queue_t *queue;        

        queue =  _args;

        queue->recycle = 1;
        sem_post(&queue->recycle_sem);

        ANALYSIS_BEGIN(0);

        YASSERT(queue->max > queue->min);

        max = queue->max;
        ret = clock_merge(queue->log, queue->data, queue->idx, queue->min, max - 1, &queue->merge);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        queue->min = max;

        ANALYSIS_END(0, IO_WARN, NULL);

        queue->recycle = 0;

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

static int __clock_merge__(clock_queue_t *queue)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        if (queue->max - queue->min < MD_RECYCLE_BEGIN || queue->recycle) {
                return 0;
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __clock_recycle_worker, queue);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sem_wait(&queue->recycle_sem);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __clock_merge(logic_clock_t *logic_clock)
{
        int ret, i;
        clock_queue_t *queue;

        for (i = 0; i < CLOCK_HASH; i++) {
                queue = &logic_clock->queue[i];
                ret = __clock_merge__(queue);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static void *__clock_worker(void *_args)
{
        int ret;
        logic_clock_t *logic_clock = _args;

        while (1) {
                sleep(3);

                ret = __clock_merge(logic_clock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        YASSERT(0);

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

static int __clock_init(logic_clock_t **_logic_clock, const char *log, const char *data, int private)
{
        int ret, i;
        logic_clock_t *logic_clock;

        YASSERT(sizeof(op_t) == MD_OP_SIZE);

        ANALYSIS_BEGIN(0);
        
        ret = ymalloc((void **)&logic_clock, sizeof(*logic_clock));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(logic_clock, 0x0, sizeof(*logic_clock));

        //snprintf(logic_clock->home, MAX_PATH_LEN, "%s/%u", __home__, core->hash);
        strcpy(logic_clock->log, log);
        strcpy(logic_clock->data, data);

        ret = clock_mem_init(&logic_clock->clock_mem, "clock", private);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = clock_merge_init(logic_clock->data, logic_clock->clock_mem);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < CLOCK_HASH; i++) {
                ret = __clock_queue_init(&logic_clock->queue[i], i);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __clock_load(&logic_clock->queue[i], logic_clock->clock_mem, logic_clock->log);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                logic_clock->queue[i].log = logic_clock->log;
                logic_clock->queue[i].data = logic_clock->data;
        }

        DINFO("home %s,%s size %lu\n", logic_clock->log, logic_clock->data, sizeof(*logic_clock));

        logic_clock->inited = 1;

#if 0//disable for faster start
        ret = __clock_try_merge(logic_clock->queue);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        ANALYSIS_END(0, IO_WARN, NULL);
        
#if 1
        ret = sy_thread_create2(__clock_worker, logic_clock, "__clock_worker");
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        *_logic_clock = logic_clock;

        return 0;
err_ret:
        return ret;
}

inline static  logic_clock_t *__clock_get(const chkid_t *id)
{
        logic_clock_t *logic_clock;

        if (likely(id->type == __RAW_CHUNK__)) {
                logic_clock = variable_get(VARIABLE_CLOCK);
                YASSERT(logic_clock);
                YASSERT(logic_clock->thread == variable_thread());
        } else {
                logic_clock = __logic_clock__;
        }

        return logic_clock;
}

int clock_set(const chkid_t *id, const vclock_t *vclock, int dirty)
{
        int ret;
        op_t op;
        clock_queue_t *queue;
        logic_clock_t *logic_clock = __clock_get(id);

#ifdef HAVE_STATIC_ASSERT
        static_assert(MD_OP_SIZE == sizeof(op), "clock");
#endif

        if (unlikely(logic_clock->inited == 0)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        queue = &logic_clock->queue[id->id % CLOCK_HASH];

        op.id = *id;
        op.vclock = *vclock;
        op.dirty = dirty;
        op.op = MD_OP_VCLOCK;
        op.used = 1;

        if (gloconf.clock || id->type != __RAW_CHUNK__) {
                ret = sy_spin_lock(&queue->lock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __clock_submit(queue, (void *)&op, sizeof(op), logic_clock->log);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                sy_spin_unlock(&queue->lock);
        }

        ret = clock_mem_vclock_set(logic_clock->clock_mem, id, vclock, dirty);
        if (unlikely(ret)) {
                if (ret == ESHUTDOWN || ret == ESTALE) {
                        EXIT(EAGAIN);
                } else
                        UNIMPLEMENTED(__DUMP__);
        }

        return 0;
err_lock:
        sy_spin_unlock(&queue->lock);
err_ret:
        return ret;
}

int clock_get(const chkid_t *id, vclock_t *_vclock, int *dirty)
{
        int ret;
        logic_clock_t *logic_clock = __clock_get(id);

        if (unlikely(logic_clock->inited == 0)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = clock_mem_vclock_get(logic_clock->clock_mem, id, _vclock, dirty);
        if (unlikely(ret)) {
                goto err_ret;
        }

        return 0;
err_ret:
        return ret;
}

int clock_remove(const chkid_t *id)
{
        int ret;
        op_t op;
        clock_queue_t *queue;
        logic_clock_t *logic_clock = __clock_get(id);

        if (unlikely(logic_clock->inited == 0)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }
        
        queue = &logic_clock->queue[id->id % CLOCK_HASH];

        memset(&op, 0x0, sizeof(op));
        op.id = *id;
        op.op = MD_OP_REMOVE;
        op.used = 1;//MD_CRC(&op);

        ret = clock_mem_remove(logic_clock->clock_mem, id);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = sy_spin_lock(&queue->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = __clock_submit(queue, (void *)&op, sizeof(op), logic_clock->log);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_spin_unlock(&queue->lock);

        return 0;
err_lock:
        sy_spin_unlock(&queue->lock);
err_ret:
        return ret;
}

inline static  void __clock_init_core(void *arg)
{
        int ret;
        logic_clock_t *logic_clock;
        char log[MAX_PATH_LEN], data[MAX_PATH_LEN];
        core_t *core = core_self();

        (void) arg;

        logic_clock = variable_get(VARIABLE_CLOCK);
        YASSERT(logic_clock == NULL);
        
        DINFO("init core[%u]\n", core->hash);
        
        snprintf(log, MAX_PATH_LEN, "%s/%u", __log__, core->hash);
        snprintf(data, MAX_PATH_LEN, "%s/%u", __data__, core->hash);
        
        ret = __clock_init(&logic_clock, log, data, 1);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        logic_clock->thread = variable_thread();
        variable_set(VARIABLE_CLOCK, logic_clock);
}

int clock_init(uint64_t max_chunk)
{
        int ret, hash, cpuset;
        char tmp[MAX_PATH_LEN], value[MD_BUFFER_SIZE], log[MAX_PATH_LEN], data[MAX_PATH_LEN];
        char log_prefix[MAX_PATH_LEN], data_prefix[MAX_PATH_LEN];
        
        clock_table_size = ((max_chunk / cpuset_useable()) / CLOCK_HASH) * 2;
        
        snprintf(log_prefix, MAX_PATH_LEN, "%s/vclcok", SHM_ROOT);

        if (gloconf.clock_mem) {
                snprintf(data_prefix, MAX_PATH_LEN, "%s/vclcok", SHM_ROOT);
        } else {
                snprintf(data_prefix, MAX_PATH_LEN, "%s/data/vclcok", gloconf.home);
        }

        DINFO("clock init, log %s, data %s\n", log_prefix, data_prefix);

        snprintf(tmp, MAX_PATH_LEN, "%s/hash", log_prefix);
        ret = _get_text(tmp, value, MD_BUFFER_SIZE);
        if (ret < 0) {
                ret = -ret;
                if (ret == ENOENT) {
                        DINFO("clock init\n");
                        ret = _delete_path(data_prefix);
                        if (ret)
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        hash = atoi(value);
        cpuset = cpuset_useable();
        if (hash != cpuset) {
                DWARN("clock rebuild %u to %u\n", hash, cpuset);

                ret = _delete_path(log_prefix);
                if (ret)
                        GOTO(err_ret, ret);

                ret = _delete_path(data_prefix);
                if (ret)
                        GOTO(err_ret, ret);
        }

        ret = path_validate(log_prefix, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = path_validate(data_prefix, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        snprintf(value, MAX_PATH_LEN, "%u", cpuset);
        ret = _set_text(tmp, value, strlen(value), O_CREAT);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&__log__, strlen(log_prefix) + 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(__log__, log_prefix);
        

        ret = ymalloc((void **)&__data__, strlen(data_prefix) + 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(__data__, data_prefix);
        
        snprintf(log, MAX_PATH_LEN, "%s/public", __log__);
        snprintf(data, MAX_PATH_LEN, "%s/public", __data__);
        ret = __clock_init(&__logic_clock__, log, data, 0);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

#if 1
        ret = core_init_register(__clock_init_core, NULL, "clock_register");
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}
